还在用传统 @Async 注解?试试这个升级版本吧,让异步任务无懈可击!
The following article is from geekhalo Author geekhalo
1. 概览
Spring 的 @Async 注解,想必大家都非常熟悉,只需在方法上增加 @Aysnc ,便可以将其转化为异步操作,任务在后台线程池中运行。
由于数据存储于内存,服务重启存在任务丢失问题,所以,只适用于要求不太严谨的业务,对于要求严格的场景,只能另选方案。
1.1. 背景
在日常开发过程中,像记录日志这种非核心业务,才允许使用 Spring 的 Async 进行异步化,其他场景需要使用更加完备的 MQ 方案。
面对这种场景,免不了一顿编码、一通测试,咱们的时间就这样没有了。对于这种纯纯的技术需求,封装框架是投入产出比最高的事。
1.2. 目标
期望框架能够提供:
不需要 Coding,直接将一个方法转变为 MQ 的异步处理; 支持 顺序消息 特性,以处理对顺序有依赖的场景; 发送,消费可以分离,能够在不同的集群中完成,以更好的支持资源隔离;
2. 快速入门
框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。
2.1. 引入 RocketMQ
我们使用 rocketmq starter 完成基本配置。
首先,在 pom 中增加 rocketmq starter 依赖,具体如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
其次,在 application.yml 中添加 rocketmq 配置,具体如下:
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: async-demo
其中,name-server 根据具体情况进行配置。
配置完成,可以在项目中:
注入 RocketMQTemplate 进行消息发送; 使用 @RocketMQMessageListener 标记处理方法,进行消息消费;
2.2. 添加 lego-starter 依赖
为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。
在 pom 中增加 starter,具体如下:
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>0.1.4-async_based_rocketmq-SNAPSHOT</version>
</dependency>
其中,自动配置机制将完成:
为 @AsyncBasedRocketMQ 注解方法,增加消息拦截,并启动 并行消费者 进行消息消费; 为 @AsyncForOrderedBasedRocketMQ 注解方法,增加消息拦截,并启动 顺序消费者进行消息消费;
2.3. 并行消息异步处理
我们只需在方法上添加 @AsyncBasedRocketMQ 注解,完成基础配置,该方法便具有异步处理能力。具体如下:
@AsyncBasedRocketMQ(topic = "${async.test.normal.topic}",
tag = "asyncTest1",
consumerGroup = "${async.test.normal.group1}")
public void asyncTest1(Long id, String name, AsyncInputBean bean){
log.info("receive data id {}, name {}, bean", id, name, bean);
CallData callData = new CallData(id, name, bean);
this.callDatas.add(callData);
}
@AsyncBasedRocketMQ 定义如下:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncBasedRocketMQ {
/**
* MQ topic
* @return
*/
String topic();
/**
* MQ tag
* @return
*/
String tag();
/**
* 消费组
* @return
*/
String consumerGroup();
/**
* nameServer 配置
* @return
*/
String nameServer() default "${rocketmq.name-server:}";
/**
* 消费者运行的 profile,主要用于发送和消费分离的场景
* @return
*/
String consumerProfile() default "";
}
在 application 文件中增加相关配置,具体如下:
async:
test:
normal:
topic: normal-async-test-topic
group1: normal-async-test-group1
group2: normal-async-test-group2
写一个简单的单测,代码如下:
@Test
public void asyncTest1() throws InterruptedException {
asyncService.getCallDatas().clear();;
Long id = RandomUtils.nextLong();
String name = String.valueOf(RandomUtils.nextLong());
AsyncInputBean bean = createAsyncInputBean();
asyncService.asyncTest1(id, name, bean);
{
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
}
TimeUnit.SECONDS.sleep(2);
{
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));
AsyncService.CallData callData = callDatas.get(0);
Assertions.assertEquals(id, callData.getId());
Assertions.assertEquals(name, callData.getName());
Assertions.assertEquals(bean, callData.getBean());
}
}
运行单测,日志如下:
[ main] c.g.l.c.a.normal.NormalAsyncInterceptor : After serialize, data is xxxxx
[ main] c.g.l.c.a.normal.NormalAsyncInterceptor : success to send async Task to RocketMQ, args is xxxx, msg is yyyy, result is zzz
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 8926281443373242368, name 1130519434586076160, bean
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxxx, cost: 31 ms
为了方便,对部分日志进行简化,但不影响分析结果。
从运行日志可以得出:
NormalAsyncInterceptor 运行在主线程中,主要完成:
对参数进行序列化 将序列化结果发送至 Rocketmq
业务服务AsyncService 运行在消费线程,主要完成:
调用业务方法 打印消费信息
2.4. 顺序消息异步处理
RocketMQ 支持顺序消息,通过指定 hashKey 可以保障相同 hashKey的 Message 路由到同一线程,以模拟顺序消费。
如果需要使用顺序消息,只需使用 @AsyncForOrderedBasedRocketMQ 即可,具体如下:
@AsyncForOrderedBasedRocketMQ(topic = "${async.test.order.topic}",
tag = "asyncTest1",
shardingKey = "#id",
consumerGroup = "${async.test.order.group1}")
public void asyncTestForOrder1(Long id, String name, AsyncInputBean bean){
log.info("receive data id {}, name {}, bean {}", id, name, bean);
CallData callData = new CallData(id, name, bean);
this.callDatas.add(callData);
}
其中,shardingKey = "#id" 含义为,将参数 id 的值作为 shardingKey。
与 AsyncBasedRocketMQ 相比,核心配置不变,只增加 shardingKey 配置,具体定义如下:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncForOrderedBasedRocketMQ {
/**
* MQ topic
* @return
*/
String topic();
/**
* MQ tag
* @return
*/
String tag();
/**
* 顺序消费设置的 hashKey
* @return
*/
String shardingKey();
/**
* 消费组
* @return
*/
String consumerGroup();
/**
* nameServer 配置
* @return
*/
String nameServer() default "${rocketmq.name-server:}";
/**
* 消费者运行的 profile,主要用于发送和消费分离的场景
* @return
*/
String consumerProfile() default "";
}
在 application.yml 增加相关配置,具体如下:
async:
test:
order:
topic: order-async-test-topic
group1: order-async-test-group1
group2: order-async-test-group2
编写单元测试用例,具体如下:
@Test
public void asyncForOrderTest1() throws InterruptedException {
List<InputData> inputDatas = new ArrayList<>();
Long[] ids = new Long[]{RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong()};
String name = String.valueOf(RandomUtils.nextLong());
AsyncInputBean bean = createAsyncInputBean();
asyncService.getCallDatas().clear();
asyncService.asyncTestForOrder1(ids[0], name, bean);
inputDatas.add(new InputData(ids[0], name, bean));
{
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
}
for (int i = 0; i< 100; i++) {
name = String.valueOf(RandomUtils.nextLong());
bean = createAsyncInputBean();
asyncService.asyncTestForOrder1(ids[i%ids.length], name, bean);
inputDatas.add(new InputData(ids[i%ids.length], name, bean));
}
TimeUnit.SECONDS.sleep(10);
{
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));
Assertions.assertEquals(inputDatas.size(), callDatas.size());
Map<Long, List<AsyncService.CallData>> callDataMap = callDatas.stream().collect(Collectors.groupingBy(AsyncService.CallData::getId));
Map<Long, List<InputData>> inputDataMap = inputDatas.stream().collect(Collectors.groupingBy(InputData::getId));
for (Long id : ids){
List<AsyncService.CallData> callDataToCheck = callDataMap.get(id);
List<InputData> inputDataToCheck = inputDataMap.get(id);
Assertions.assertEquals(callDataToCheck.size(), inputDataToCheck.size());
for (int j = 0; j < callDataToCheck.size(); j++) {
AsyncService.CallData callData = callDataToCheck.get(j);
InputData inputData1 = inputDataToCheck.get(j);
Assertions.assertEquals(inputData1.getId(), callData.getId());
Assertions.assertEquals(inputData1.getName(), callData.getName());
Assertions.assertEquals(inputData1.getBean(), callData.getBean());
}
}
}
}
运行测试用例,观察日志如下:
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data xxx
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 6723772904149174272, name 8410395540617317376, bean AsyncInputBean(id=81325280405335040, name=1950309494, age=976367396)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 1 ms
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 8896761273036908544, msg is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 6723772904149174272, name 693725382660268032, bean AsyncInputBean(id=1379620281334973440, name=1090615484, age=1421031650)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt zzz, cost: 0 ms
[MessageThread_2] com.geekhalo.lego.async.AsyncService : receive data id 8896761273036908544, name 7307088811299682304, bean AsyncInputBean(id=594404553604282368, name=812325506, age=1784532908)
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx, shardingKey is 717741895048495104, msg is zzz
[MessageThread_2] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 0 ms
从日志上可见:
主线程 和 消费线程 交叉输出日志; 拦截器 OrderedAsyncInterceptor 基于 sharding key 向 RocketMQ 发送顺序消息; AbstractAsyncConsumerContainer 对顺序消息进行消费;
2.5. 发送和消费分离
有时为了更好的对资源进行隔离,会单独部署一组集群,用于处理后台任务。
为支持该模式,AsyncBasedRocketMQ 和 AsyncForOrderedBasedRocketMQ 都提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。
3.设计&扩展
3.1. 核心设计
在方法上添加注解后,框架自动完成:
增加 AsyncInterceptor Bean,用于对方法进行拦截; 启动 MQConsumer 监听消息变更,并调用 业务方法;
3.2. 核心流程
核心流程如下:
方法被调用,被 AsyncInterceptor 拦截;
首先,对调用参数进行序列化; 然后,将信息封装为 Message 最后,将Message发送至 RocketMQ
消息在RocketMQ进行存储,并投放至 Consumer;
MQPushConsumer,监听消息,并完成业务操作;
Consumer 获得 Message 信息 将消息进行反序列化,获得调用参数 使用调用参数调用业务方法
4. 项目信息
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/asyncBasedRocketMQ
微信8.0将好友放开到了一万,小伙伴可以加我大号了,先到先得,再满就真没了
扫描下方二维码即可加我微信啦,2022,抱团取暖,一起牛逼。